package com.sean.community.event;

import com.alibaba.fastjson.JSONObject;
import com.qiniu.common.QiniuException;
import com.qiniu.common.Zone;
import com.qiniu.http.Response;
import com.qiniu.storage.Configuration;
import com.qiniu.storage.UploadManager;
import com.qiniu.util.Auth;
import com.qiniu.util.StringMap;
import com.sean.community.entity.DiscussPost;
import com.sean.community.entity.Event;
import com.sean.community.entity.Message;
import com.sean.community.service.DiscussPostService;
import com.sean.community.service.ElasticsearchService;
import com.sean.community.service.MessageService;
import com.sean.community.util.CommunityConstant;
import com.sean.community.util.CommunityUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Component;

import java.io.File;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;

/**
 * 消费者
 */
@Component
@SuppressWarnings("all")
public class EventConsumer implements CommunityConstant {
    private Logger logger = LoggerFactory.getLogger(EventConsumer.class);
    private MessageService messageService;
    private DiscussPostService discussPostService;
    private ElasticsearchService elasticsearchService;
    private ThreadPoolTaskScheduler threadPoolTaskScheduler;

    @Value("${wk.image.command}")
    private String wkCommand;

    @Value("${wk.image.storage}")
    private String wkImageStorage;

    @Value("${qiniu.key.access}")
    private String fileCloudServerAccessKey;    // 文件云服务器，用户身份认证密钥

    @Value("${qiniu.key.secret}")
    private String fileCloudServerSecretKey;    // 文件云服务器，文件加密密钥

    @Value("${qiniu.bucket.share.name}")
    private String fileCloudServerBucketName_share;   // 文件云服务器，头像空间名称

    @Value("${qiniu..bucket.share.url}")
    private String fileCloudServerBucketUrl_share;    // 文件云服务器，头像空间 url

    @Autowired
    public void setMessageService(MessageService messageService) {
        this.messageService = messageService;
    }

    @Autowired
    public void setDiscussPostService(DiscussPostService discussPostService) {
        this.discussPostService = discussPostService;
    }

    @Autowired
    public void setElasticsearchService(ElasticsearchService elasticsearchService) {
        this.elasticsearchService = elasticsearchService;
    }

    @Autowired
    public void setThreadPoolTaskScheduler(ThreadPoolTaskScheduler threadPoolTaskScheduler) {
        this.threadPoolTaskScheduler = threadPoolTaskScheduler;
    }

    /**
     * 处理评论，点赞，关注消息
     */
    @KafkaListener(topics = {TOPIC_COMMENT, TOPIC_LIKE, TOPIC_FOLLOW})
    public void handleCLFMessage(ConsumerRecord record){
        if(record == null || record.value() == null){
            logger.error("消息为空");
            return;
        }
        Event event = JSONObject.parseObject(record.value().toString(), Event.class);
        if (event == null) {
            logger.error("消息格式错误！");
            return;
        }
        // 发站内信（插入一条数据到 Message 表）
        Message message = new Message();
        message.setFromId(SYSTEM_USER_ID);
        message.setToId(event.getEntityUserId());
        message.setConversationId(event.getTopic());
        message.setCreateTime(new Date());
        // 内容
        Map<String, Object> content = new HashMap<>();
        content.put("userId", event.getUserId());
        content.put("entityType", event.getEntityType());
        content.put("entityId", event.getEntityId());
        if(!event.getData().isEmpty()){
            for(Map.Entry<String, Object> entry : event.getData().entrySet()){
                content.put(entry.getKey(), entry.getValue());
            }
        }
        message.setContent(JSONObject.toJSONString(content));
        messageService.addMessage(message);
    }

    /**
     * 消费 es 事件，将帖子同步到 es 服务器中
     */
    @KafkaListener(topics = {TOPIC_PUBLISH})
    public void handlePublishMessage(ConsumerRecord record){
        if(record == null || record.value() == null){
            logger.error("消息为空");
            return;
        }
        Event event = JSONObject.parseObject(record.value().toString(), Event.class);
        if (event == null) {
            logger.error("消息格式错误！");
            return;
        }
        DiscussPost discussPost = discussPostService.findDiscussPostById(event.getEntityId());
        elasticsearchService.saveDiscussPost(discussPost);
    }

    /**
     * 消费 es 事件，将帖子从 es 服务器中删除
     */
    @KafkaListener(topics = {TOPIC_DELETE})
    public void handleDeleteMessage(ConsumerRecord record){
        if(record == null || record.value() == null){
            logger.error("消息为空");
            return;
        }
        Event event = JSONObject.parseObject(record.value().toString(), Event.class);
        if (event == null) {
            logger.error("消息格式错误！");
            return;
        }
        elasticsearchService.deleteDiscussPost(event.getEntityId());
    }

    /**
     * 消费 es 事件， 生成分享图片
     */
    @KafkaListener(topics = {TOPIC_SHARE})
    public void handleShareImage(ConsumerRecord record){
        if(record == null || record.value() == null){
            logger.error("消息为空");
            return;
        }
        Event event = JSONObject.parseObject(record.value().toString(), Event.class);
        if (event == null) {
            logger.error("消息格式错误！");
            return;
        }
        String htmlUrl = (String)event.getData().get("htmlUrl");
        String fileName = (String)event.getData().get("fileName");
        String suffix = (String)event.getData().get("suffix");

        // wkhtmltoimage --qulity 75 htmlUrl d:xxxx/filename.png
        String wkCmd = wkCommand + " --quality 75 "
                + htmlUrl + " " + wkImageStorage + "/" + fileName + suffix;
        try{
            Runtime.getRuntime().exec(wkCmd);           // #1 生成图片是异步的，后面的逻辑会比图片更快执行
            logger.info("生成图片成功! " + wkCmd);
        }catch (IOException e){
            e.printStackTrace();
            logger.error("生成图片失败! " + wkCmd);
        }
        // 把图片上传到文件云服务器
        // 为什么要用定时任务线程池？ 原因见 #1
        // 执行到这里上传文件时，图片都不一定生成完了
        // 所以用异步定时轮询，图片生成就上传
        // 为什么不需要使用 Quartz，为什么不需要考虑分布式部署，因为 kafka 消息消费，天然有一个抢占机制，只有一个服务器会执行

        // 启动定时器，监控图片，一旦生成，就上传
        UploadTask uploadTask = new UploadTask(fileName, suffix);
        Future future = threadPoolTaskScheduler.scheduleAtFixedRate(uploadTask, 500);   // 每隔 500ms 执行一次
        uploadTask.setFuture(future);
    }

    private class UploadTask implements Runnable{
        // 用于寻找文件
        // 文件名
        private String fileName;
        // 文件后缀
        private String suffix;
        // 流程控制
        // 线程返回值
        private Future future;
        // 开始时间
        private long startTime;
        // 上传次数
        private int uploadTimes;

        public UploadTask() {
        }

        public UploadTask(String fileName, String suffix) {
            this.fileName = fileName;
            this.suffix = suffix;
            this.startTime = System.currentTimeMillis();
        }

        public void setFuture(Future future) {
            this.future = future;
        }

        @Override
        public void run() {
            // 生成图片失败
            if(System.currentTimeMillis() - (1000 * 30) > this.startTime){
                logger.error("执行时间过长，终止任务！" + fileName);
                future.cancel(true);
                return;
            }
            // 上传云服务器失败
            if(uploadTimes >= 3){
                logger.error("上传次数过多，终止任务！" + fileName);
                future.cancel(true);
                return;
            }
            String path = wkImageStorage + "/" + fileName + suffix; // 本地文件路径
            File file = new File(path);
            if(file.exists()){
                logger.info(String.format("开始第 %d 次上传[%s].", ++uploadTimes, fileName));
                // 生成分享图片上传云服务器凭证
                // 期待相应
                StringMap policy = new StringMap();
                policy.put("returnBody", CommunityUtil.getJSONString(200));  // 客户端直传云服务器通常采用异步方式，返回 JSON
                // 生成凭证
                Auth auth = Auth.create(fileCloudServerAccessKey, fileCloudServerSecretKey);
                String uploadToken = auth.uploadToken(fileCloudServerBucketName_share, fileName, 3600, policy);
                // 指定上传域名
                UploadManager manager = new UploadManager(new Configuration(Zone.zone2()));
                try {
                    // 开始上传图片
                    Response response = manager.put(
                            path, fileName, uploadToken, null, "image/" + suffix, false
                    );
                    // 处理相应结果
                    JSONObject jsonObject = JSONObject.parseObject(response.bodyString());
                    if(jsonObject == null ||
                            jsonObject.get("code") == null ||
                            !jsonObject.get("code").toString().equals("200")){
                        // 上传失败
                        logger.error(String.format("开始第 %d 次上传失败[%s].", uploadTimes, fileName));
                    }else{
                        logger.info(String.format("开始第 %d 次上传成功[%s].", uploadTimes, fileName));
                        future.cancel(true);
                    }
                }catch (QiniuException e){
                    logger.error(String.format("开始第 %d 次上传失败[%s].", uploadTimes, fileName));
                }
            }else{
                logger.error("等待图片生成 [" + fileName + "].");
            }
        }
    }
}
